-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster #28412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
So you mean for loading 1Gb of eventlogs, the HybridKVStore takes 63s while the current LevelDB takes 69s ? |
|
@jiangxb1987 from my testing result, loading 1g file, HybridKVStore takes 23s to parse (that means, users only need to wait for 23s to see the UI), LevelDB takes 69s. the 40s is the total time for parsing a file and transferring the data to leveldb. Sorry for the confusion. |
|
@baohe-zhang Good to know the result, it sounds great! cc @rednaxelafx |
|
ok to test |
|
Test build #122105 has finished for PR 28412 at commit
|
|
Test build #122119 has finished for PR 28412 at commit
|
|
cc @vanzin FYI |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution. The concept looks great.
I've skimmed the implementation, and got the feeling that this is a bit complicated because of dual writes on the foreground & background. The complication is even exposed to the FsHistoryProvider.
Given the HybridKVStore is only used for loading event log and no further exposed for modification (AFAIK - please correct me if I'm missing here), I'm seeing a chance to simplify the logic - dump in-memory KVStore back to LevelDB (in background) once the loading is done to the in-memory KVStore. (KVStore should be read-only and reject the writes.) We won't need to deal with dual writes on concurrent threads, only need to switch the KVStore correctly.
This wouldn't bring latency on serving content - needs more seconds to write to LevelDB, in other words, needs more seconds to keep up the memory usage. (Not sure how long it would be.)
What do you think?
| // compaction may touch the file(s) which app rebuild wants to read | ||
| // compaction wouldn't run in short interval, so try again... | ||
| logWarning(s"Exception occurred while rebuilding app $appId - trying again...") | ||
| lease.rollback() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lease.rollback() was enough to throw out intermediate LevelDB KVStore on temporary directory which fails to load at any reason. It doesn't look like the case of Hybrid KVStore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't quite understand this comment, could you elaborate more? I think in the current implementation, if any exceptions are thrown when migrating data to leveldb, the hybrid kvstore will not switch to leveldb and the getStore() method in hybrid kvstore will always return an in-memory kvstore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed the instance will be dereferenced, my bad.
It might be still ideal to clean up the instance explicitly, as the instance may keep the huge memory usages. Please note that this can happen in the middle of processing.
(The previous logic may not be complete to clean up the instance as well, so just 2 cents.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hybrid kvstore is now cleaned up explicitly.
|
|
||
| // TODO: Maybe need to do other check to see if there's enough memory to | ||
| // use inMemoryStore. | ||
| if (hybridKVStoreEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've feeling that too much HybridKVStore implementation details are exposed here which could be abstracted away if we have proper interface for disk based loading-only KVStore for SHS.
E.g. suppose new interface on top of KVStore receives a Lease on initialize(), and exposes commit() & rollback() to handle the implementation details on each condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I will address it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it's difficult to pass Lease to hybrid kvstore, since Lease class is only visible within the History package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code related to creating a hybrid kvstore is now refactored as a function.
| logWarning(s"Failed to switch to use LevelDb for app" + | ||
| s" $appId / ${attempt.info.attemptId}") | ||
| levelDB.close() | ||
| throw e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Here we should rollback the lease as well.
- SHS would assume KVStore is loaded properly when the caller method returns. If I understand correctly, throwing exception here doesn't propagate to the caller method, which means HybridKVStore should still serve the content in any way, in-memory store for this case. (Caution of memory usage)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address 1. For 2, if switching to leveldb failed, hybrid kvstore will always use in-memory kvstore as it's underlying kvstore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 is addressed.
| .createWithDefault(true) | ||
|
|
||
| val HYBRID_KVSTORE_ENABLED = ConfigBuilder("spark.history.store.hybridKVStore.enabled") | ||
| .version("3.0.1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This needs to have doc to describe the functionality, as well as proper caution on memory usage.
- "3.1.0" is correct as of now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
|
concept seems good, a couple of high level questions without me having looked at the detailed code. |
|
Also worth to mention that memory usage should be under control; there's no restriction for now. |
|
Test build #122332 has finished for PR 28412 at commit
|
|
Let's discuss first with the plan how to address the major concerns in comments, especially how to restrict the overall memory usage. I think that's a blocker for the production use. |
|
@HeartSaVioR One way in my mind is that we can monitor the memory usage of SHS. If the memory usage or event log size exceeds a threshold (e.g, over 50% of Xmx), we can use leveldb to parse event log, instead of hybrid kvstore. |
|
I'm not sure that's fairly simple to do. Concurrent load of applications can be happening in SHS, right? The default value of |
|
Another way is to keep a thread-safe variable called availableMemory in FsHistoryProvider. The initial value can be set as a percentage of Xmx. When we parse a file via hybrid kvstore, we subtract an approximate memory usage from availableMemory, and when the hybrid store switches to leveldb, we add back this approximate memory usage. When availableMemory is below a threshold, we can disable hybridKVstore. |
|
Test build #122333 has finished for PR 28412 at commit
|
|
Test build #122335 has finished for PR 28412 at commit
|
|
@tgravescs What I saw in FsHistoryProvider is that "spark.history.fs.numReplayThreads" is used to create a thread pool to mergeApplicationListing and compact. These threads seem are not responsible for parsing a complete single event. But I am not sure if my understanding is correct. And In hybrid kvstore, each instance has one writing thread to write data to leveldb. |
|
The idea is similar with HistoryServerDiskManager so makes sense in general. We may need to get concrete answers for these questions to go forward:
|
|
Test build #122553 has finished for PR 28412 at commit
|
|
@HeartSaVioR
|
|
Test build #122555 has finished for PR 28412 at commit
|
|
@HeartSaVioR @tgravescs
|
|
Thanks for the update. There's a case it goes up closer to 10x but not really 10x, which seems that 10x is safe one to apply. |
| val it = inMemoryStore.view(klass).closeableIterator() | ||
| while (it.hasNext()) { | ||
| levelDB.write(it.next()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drive by comment - given I added something similar to an in-house patch.
Add a write(Iterator<E> values) to kv store - this should make this switch order(s) faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the information, I will try adding a write(Iterator<E> values) in this pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this helps, this is what I had written up for level db - for memory store, the default list traversal + write is good enough :
@Override
public void write(List<?> values) throws Exception {
Preconditions.checkArgument(values != null && !values.isEmpty(),
"Non-empty values required.");
// Group by class, in case there are values from different classes in the iterator
// Typical usecase is for this to be a single class.
for (Map.Entry<? extends Class<?>, ? extends List<?>> entry :
values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) {
final Iterator<?> valueIter = entry.getValue().iterator();
final Iterator<byte[]> serializedValueIter;
{
// deserialize outside synchronized block
List<byte[]> list = new ArrayList<>(entry.getValue().size());
for (Object value : entry.getValue()) {
list.add(serializer.serialize(value));
}
serializedValueIter = list.iterator();
}
final Class<?> valueClass = entry.getKey();
final LevelDBTypeInfo ti = getTypeInfo(valueClass);
// Batching updates per type
synchronized (ti) {
final LevelDBTypeInfo.Index naturalIndex = ti.naturalIndex();
final Collection<LevelDBTypeInfo.Index> indices = ti.indices();
try (WriteBatch batch = db().createWriteBatch()) {
while (valueIter.hasNext()) {
final Object value = valueIter.next();
assert serializedValueIter.hasNext();
final byte[] serializedObject = serializedValueIter.next();
Object existing;
try {
existing = get(naturalIndex.entityKey(null, value), valueClass);
} catch (NoSuchElementException e) {
existing = null;
}
PrefixCache cache = new PrefixCache(value);
byte[] naturalKey = naturalIndex.toKey(naturalIndex.getValue(value));
for (LevelDBTypeInfo.Index idx : indices) {
byte[] prefix = cache.getPrefix(idx);
idx.add(batch, value, existing, serializedObject, naturalKey, prefix);
}
}
assert !serializedValueIter.hasNext();
db().write(batch);
}
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this is helpful!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mridulm, I updated your code and used it on in-memory store - leveldb switching, but only saw little switching time improvement. I am not sure if somewhere wrong.
| log size, jobs and tasks per job | 2 jobs, 400 tasks per job | 10 jobs, 400 tasks per job | 50 jobs, 400 tasks per job | 100 jobs, 400 tasks per job | 200 jobs, 400 tasks per job | 500 jobs, 400 tasks per job | 1000 jobs, 400 tasks per job | 5 jobs, 100000 tasks per job |
|---|---|---|---|---|---|---|---|---|
| original switching time | 1s | 2s | 4s | 8s | 16s | 37s | 65s | 90s |
| switching time with write(Iterator iter) | 1s | 1s | 4s | 7s | 13s | 34s | 58s | 84s |
The code:
for (klass <- klassMap.keys().asScala) {
val it = inMemoryStore.view(klass).closeableIterator()
levelDB.write(it)
}
public <T> void write(Iterator<T> iter) throws Exception {
Preconditions.checkArgument(iter != null, "Non-empty values required.");
List<T> values = new ArrayList<>();
iter.forEachRemaining(values::add);
// Group by class, in case there are values from different classes in the iterator
// Typical usecase is for this to be a single class.
for (Map.Entry<? extends Class<?>, ? extends List<?>> entry :
values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) {
final Iterator<?> valueIter = entry.getValue().iterator();
final Iterator<byte[]> serializedValueIter;
{
// deserialize outside synchronized block
List<byte[]> list = new ArrayList<>(entry.getValue().size());
for (Object value : entry.getValue()) {
list.add(serializer.serialize(value));
}
serializedValueIter = list.iterator();
}
final Class<?> valueClass = entry.getKey();
final LevelDBTypeInfo ti = getTypeInfo(valueClass);
// Batching updates per type
synchronized (ti) {
final LevelDBTypeInfo.Index naturalIndex = ti.naturalIndex();
final Collection<LevelDBTypeInfo.Index> indices = ti.indices();
try (WriteBatch batch = db().createWriteBatch()) {
while (valueIter.hasNext()) {
final Object value = valueIter.next();
assert serializedValueIter.hasNext();
final byte[] serializedObject = serializedValueIter.next();
Object existing;
try {
existing = get(naturalIndex.entityKey(null, value), valueClass);
} catch (NoSuchElementException e) {
existing = null;
}
PrefixCache cache = new PrefixCache(value);
byte[] naturalKey = naturalIndex.toKey(naturalIndex.getValue(value));
for (LevelDBTypeInfo.Index idx : indices) {
byte[] prefix = cache.getPrefix(idx);
idx.add(batch, value, existing, serializedObject, naturalKey, prefix);
}
}
assert !serializedValueIter.hasNext();
db().write(batch);
}
}
}
}
I think using multiple threads to write data to leveldb might shorten the switching time but it would introduce more overhead to SHS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a function of how loaded your disk is, iops it can sustain, txn's leveldb can do concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. I am testing it on the mac which has SSD and the disk is not busy. I think the improvement might be more obvious on HDD or busy disk. @HeartSaVioR @tgravescs Do we need to add batch write support for leveldb on this pr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's mandatory. You can file another issue as "improvement" for this, but IMHO working with this is completely optional for you. I think we have already asked so many things to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. We can improve that afterward.
|
yeah 10x definitely seems safe as most of the number are more at the 8x number for zstd. I'm fine with leaving the current logic for the small files, we can always follow up with more enhancements to skip them later if we see that its causing a lot of load. @HeartSaVioR I'm not sure if that is what you were agreeing with or your suggest was to change it here to skip the small ones? I think you were ok either way but wanted to clarify. |
|
Yeah I’m OK either way. I think both ways wouldn’t bring major issues in reality. |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for as it is. I'll wait for @tgravescs to finalize review.
|
retest this, please |
|
Adding batch update is a perf improvement - not required for the
functionality being added.
I mentioned it given I did it recently and similar snippet was pretty much
exactly what I had replaced :-)
…On Thu, Jul 9, 2020 at 9:04 AM Baohe Zhang ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
<#28412 (comment)>:
> + */
+ def switchToLevelDB(
+ listener: HybridStore.SwitchToLevelDBListener,
+ appId: String,
+ attemptId: Option[String]): Unit = {
+ if (closed.get) {
+ return
+ }
+
+ backgroundThread = new Thread(() => {
+ try {
+ for (klass <- klassMap.keys().asScala) {
+ val it = inMemoryStore.view(klass).closeableIterator()
+ while (it.hasNext()) {
+ levelDB.write(it.next())
+ }
Make sense. I am testing it on the mac which has SSD and the disk is not
busy. I think the improvement might be more obvious on HDD or busy disk.
@HeartSaVioR <https://github.com/HeartSaVioR> @tgravescs
<https://github.com/tgravescs> Do we need to add batch write support for
leveldb on this pr?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#28412 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAMETFA74MHMBGAZ5XWWC43R2XTCFANCNFSM4MUQYGFQ>
.
|
|
Test build #125522 has finished for PR 28412 at commit
|
|
retest this, please |
|
Test build #125598 has finished for PR 28412 at commit
|
|
test this please |
|
Test build #125616 has finished for PR 28412 at commit
|
|
test this please |
|
Test build #125635 has finished for PR 28412 at commit
|
|
@tgravescs Do you plan another round of review, or OK as it is? |
|
looks good |
|
retest this, please |
|
Test build #125841 has finished for PR 28412 at commit
|
|
Thanks! Merged into master. |
|
Thanks a lot for your reviews! |
… of HybridStore ### What changes were proposed in this pull request? The idea is to improve the performance of HybridStore by adding batch write support to LevelDB. #28412 introduces HybridStore. HybridStore will write data to InMemoryStore at first and use a background thread to dump data to LevelDB once the writing to InMemoryStore is completed. In the comments section of #28412 , mridulm mentioned using batch writing can improve the performance of this dumping process and he wrote the code of writeAll(). ### Why are the changes needed? I did the comparison of the HybridStore switching time between one-by-one write and batch write on an HDD disk. When the disk is free, the batch-write has around 25% improvement, and when the disk is 100% busy, the batch-write has 7x - 10x improvement. when the disk is at 0% utilization: | log size, jobs and tasks per job | original switching time, with write() | switching time with writeAll() | | ---------------------------------- | ------------------------------------- | ------------------------------ | | 133m, 400 jobs, 100 tasks per job | 16s | 13s | | 265m, 400 jobs, 200 tasks per job | 30s | 23s | | 1.3g, 1000 jobs, 400 tasks per job | 136s | 108s | when the disk is at 100% utilization: | log size, jobs and tasks per job | original switching time, with write() | switching time with writeAll() | | --------------------------------- | ------------------------------------- | ------------------------------ | | 133m, 400 jobs, 100 tasks per job | 116s | 17s | | 265m, 400 jobs, 200 tasks per job | 251s | 26s | I also ran some write related benchmarking tests on LevelDBBenchmark.java and measured the total time of writing 1024 objects. The tests were conducted when the disk is at 0% utilization. | Benchmark test | with write(), ms | with writeAll(), ms | | ------------------------ | ---------------- | ------------------- | | randomUpdatesIndexed | 213.06 | 157.356 | | randomUpdatesNoIndex | 57.869 | 35.439 | | randomWritesIndexed | 298.854 | 229.274 | | randomWritesNoIndex | 66.764 | 38.361 | | sequentialUpdatesIndexed | 87.019 | 56.219 | | sequentialUpdatesNoIndex | 61.851 | 41.942 | | sequentialWritesIndexed | 94.044 | 56.534 | | sequentialWritesNoIndex | 118.345 | 66.483 | ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually tested. Closes #29149 from baohe-zhang/SPARK-32350. Authored-by: Baohe Zhang <[email protected]> Signed-off-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#28412 does not have any unit test case for verifying the new behaviors and configurations.
This is required for all the feature PRs even if the features are added for improving the performance
|
@gatorsmile Thanks for reminding me that! I will add unit tests for these HybridStore related PRs. |
…HistoryServerMemoryManager ### What changes were proposed in this pull request? This pull request adds 2 test suites for 2 new classes HybridStore and HistoryServerMemoryManager, which were created in #28412. This pull request also did some minor changes in these 2 classes to expose some variables for testing. Besides 2 suites, this pull request adds a unit test in FsHistoryProviderSuite to test parsing logs with HybridStore. ### Why are the changes needed? Unit tests are needed for new features. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests. Closes #29509 from baohe-zhang/SPARK-31608-UT. Authored-by: Baohe Zhang <[email protected]> Signed-off-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
What changes were proposed in this pull request?
Add a new class HybridStore to make the history server faster when loading event files. When rebuilding the application state from event logs, HybridStore will write data to InMemoryStore at first and use a background thread to dump data to LevelDB once the writing to InMemoryStore is completed. HybridStore is to make content serving faster by using more memory. It's only safe to enable it when the cluster is not having a heavy load.
Why are the changes needed?
HybridStore can greatly reduce the event logs loading time, especially for large log files. In general, it has 4x - 6x UI loading speed improvement for large log files. The detailed result is shown in comments.
Does this PR introduce any user-facing change?
This PR adds new configs
spark.history.store.hybridStore.enabledandspark.history.store.hybridStore.maxMemoryUsage.How was this patch tested?
A test suite for HybridStore is added. I also manually tested it on 3.1.0 on mac os.
This is a follow-up for the work done by Hieu Huynh in 2019.